Skip to content

💎Kafka面试题集合

Kafka入门

什么是消息队列与Kafka简介

消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列。

消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。

20251206233441420fa7d70.png

  • Producer:消息生产者,负责产生和发送消息到 Broker;
  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

为什么需要消息队列?

1、屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。

2、异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削峰)。

3、解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。

4、复用:一次发送多次消费。

5、可靠:一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。

6、提供路由:发送者无需与接收者建立连接,双方通过消息队列保证消息能够从发送者路由到接收者,甚至对于本来网络不易互通的两个服务,也可以提供消息路由。

消息队列有什么优点和缺点?

  1. 核心优点
    1. 解耦
    2. 异步
    3. 削峰
  2. 缺点
    1. 系统可用性降低:系统引入的外部依赖越多,越容易挂掉。
    2. 系统复杂度提高了
    3. 一致性问题:消息传递给多个系统,部分执行成功,部分执行失败,容易导致数据不一致

Kafka简介

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

Kafka的优势和特点

  • 高吞吐量:单机每秒处理几十上百万的消息量。即使存储了许多TB的消息,它也保持稳定的性能。
  • 高性能:单节点支持上千个客户端,并保证零停机和零数据丢失,异步化处理机制
  • **持久化:**将消息持久化到磁盘。通过将数据持久化到硬盘以及replica(follower节点)防止数据丢失。
  • 零拷贝:减少了很多的拷贝技术,以及可以总体减少阻塞事件,提高吞吐量。
  • **可靠性 :**Kafka是分布式,分区,复制和容错的。
  • Kafka的特点 :
    • 顺序读,顺序写
    • 利用Linux的页缓存
    • 分布式系统,易于向外扩展。所有的Producer、Broker和Consumer都会有多个,均为分布式的。无需停机即可扩展机器。多个Producer、Consumer可能是不同的应用。
    • 客户端状态维护:消息被处理的状态是在Consumer端维护,而不是由server端维护。当失败时能自动平衡。
    • 支持online(在线)和offline(离线)的场景。
    • 支持多种客户端语言。Kafka支持Java、.NET、PHP、Python等多种语言。

Kafka与传统消息队列的对比

2025120623344155d93870e.png

各种对比之后,有如下建议:

  • ActiveMQ,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐;
  • RabbitMQ,虽然erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是毕竟是开源的,比较稳定的支持,活跃度也高,推荐中小型公司使用;推荐
  • RocketMQ,阿里出品,Java语言编写,经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的严重。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景;强烈推荐
  • Kafka,如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

Kafka的架构设计

20251206233441a46a98db3.png

kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。

kafka有以下一些基本概念:

Producer** **- 消息生产者,就是向kafka broker发消息的客户端。

Consumer** **- 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

Topic** **- 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

Broker** **- 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

**Offset **- 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

基本概念:

消息和批次

消息 ,Kafka里的数据单元,也就是我们一般消息中间件里的消息的概念(可以比作数据库中一条记录)。消息由字节数组组成。消息还可以包含键(可选元数据,也是字节数组),主要用于对消息选取分区。

作为一个高效的消息系统,为了提高效率,消息可以被分批写入Kafka。批次就是一组消息,这些消息属于同一个主题和分区。如果只传递单个消息,会导致大量的网络开销,把消息分成批次传输可以减少这开销。但是,这个需要权衡(时间延迟和吞吐量之间),批次里包含的消息越多,单位时间内处理的消息就越多,单个消息的传输时间就越长(吞吐量高延时也高)。如果进行压缩,可以提升数据的传输和存储能力,但需要更多的计算处理。

对于Kafka来说,消息是晦涩难懂的字节数组,一般我们使用序列化和反序列化技术,格式常用的有JSON和XML,还有Avro(Hadoop开发的一款序列化框架),具体怎么使用依据自身的业务来定。

主题和分区

Kafka里的消息用主题进行分类(主题好比数据库中的表),主题下有可以被分为若干个 分区(分表技术) 。分区本质上是个提交日志文件,有新消息,这个消息就会以追加的方式写入分区(写文件的形式),然后用先入先出的顺序读取。

2025120623344123b2e4dbf.png

但是因为主题会有多个分区,所以在整个主题的范围内,是无法保证消息的顺序的,单个分区则可以保证。

Kafka通过分区来实现数据冗余和伸缩性,因为分区可以分布在不同的服务器上,那就是说一个主题可以跨越多个服务器(这是Kafka高性能的一个原因,多台服务器的磁盘读写性能比单台更高)。

前面我们说Kafka可以看成一个流平台,很多时候,我们会把一个主题的数据看成一个流,不管有多少个分区。

生产者和消费者、偏移量、消费者群组

就是一般消息中间件里生产者和消费者的概念。一些其他的高级客户端API,像数据管道API和流式处理的Kafka Stream,都是使用了最基本的生产者和消费者作为内部组件,然后提供了高级功能。

生产者默认情况下把消息均衡分布到主题的所有分区上,如果需要指定分区,则需要使用消息里的消息键和分区器。

消费者订阅主题,一个或者多个,并且按照消息的生成顺序读取。消费者通过检查所谓的偏移量来区分消息是否读取过。偏移量是一种元数据,一个不断递增的整数值,创建消息的时候,Kafka会把他加入消息。在一个主题中一个分区里,每个消息的偏移量是唯一的。每个分区最后读取的消息偏移量会保存到Zookeeper或者Kafka上,这样分区的消费者关闭或者重启,读取状态都不会丢失。

多个消费者可以构成一个消费者群组。怎么构成?共同读取一个主题的消费者们,就形成了一个群组。群组可以保证每个分区只被一个消费者使用。

20251206233441f2c12193d.png

**消费者和分区之间的这种映射关系叫做消费者对分区的所有权关系,很明显,一个分区只有一个消费者,而一个消费者可以有多个分区。**Kafka 区别于其他 MQ 之一

(吃饭的故事:一桌一个分区,多桌多个分区,生产者不断生产消息(消费),消费者就是买单的人,消费者群组就是一群买单的人),一个分区只能被消费者群组中的一个消费者消费(不能重复消费),如果有一个消费者挂掉了<James跑路了>,另外的消费者接上)

Broker和集群

一个独立的Kafka服务器叫Broker。

broker的主要工作是,接收生产者的消息,设置偏移量,提交消息到磁盘保存;为消费者提供服务,响应请求,返回消息。在合适的硬件上,单个broker可以处理上千个分区和每秒百万级的消息量。(要达到这个目的需要做操作系统调优和JVM调优)

多个broker可以组成一个集群。每个集群中broker会选举出一个集群控制器。控制器会进行管理,包括将分区分配给broker和监控broker。

集群里,一个分区从属于一个broker,这个broker被称为首领。但是分区可以被分配给多个broker,这个时候会发生分区复制。

集群中Kafka内部一般使用管道技术进行高效的复制。

202512062334417fea87c61.png

分区复制带来的好处是,提供了消息冗余。一旦首领broker失效,其他broker可以接管领导权。当然相关的消费者和生产者都要重新连接到新的首领上。(详细过程可见下面的选举部分)

保留消息

在一定期限内保留消息是Kafka的一个重要特性,Kafka broker默认的保留策略是:要么保留一段时间(7天),要么保留一定大小(比如1个G)。到了限制,旧消息过期并删除。但是每个主题可以根据业务需求配置自己的保留策略(开发时要注意,Kafka不像Mysql之类的永久存储)。

工作流程

202512062334419dcf98265.png

  • producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
  • producer将消息发送给该leader
  • leader将消息写入本地log
  • followers从leader pull消息
  • 写入本地log后向leader发送ACK
  • leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK

tips

  • Kafka 中消息是以topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
  • topic 是逻辑上的概念,而partition 是物理上的概念,每个partition 对应一个log 文件,该log 文件中存储的就是producer 生产的数据。Producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

Kafka的数据模型与消息存储机制

消息存储结构2025120623344134dd7e722.png

Kafka 有 Topic 和 Partition 两个概念,一个 Topic 可以有多个 Partition。在实际存储的时候,Topic + Partition 对应一个文件夹,这个文件夹对应的是这个 Partition 的数据。

在 Kafka 的数据文件目录下,一个 Partition 对应一个唯一的文件夹。如果有 4 个 Topic,每个 Topic 有 5 个 Partition,那么一共会有 4 * 5 = 20 个文件夹。而在 文件夹下,Kafka 消息是采用 Segment File 的存储方式进行存储的。

Segment File 的大概意思是:将大文件拆分成小文件来存储,这样一个大文件就变成了一段一段(Segment 段)。这样的好处是 IO 加载速度快,不会有很长的 IO 加载时间。Kafka 的消息存储就采用了这种方式。

202512062334410ab10142c.png

如上图所示,在一个文件夹下的数据会根据 Kafka 的配置拆分成多个小文件。拆分规则可以根据文件大小拆分,也可以根据消息条数拆分,这个是 Kafka 的一个配置,这里不细说。

在 Kafka 的数据文件夹下,分为两种类型的文件:索引文件(Index File)和数据文件(Data File)。索引文件存的是消息的索引信息,帮助快速定位到某条消息。数据文件存储的是具体的消息内容。

索引文件

索引文件的命名统一为数字格式,其名称表示 Kafka 消息的偏移量。我们假设索引文件的数字为 N,那么就代表该索引文件存储的第一条 Kafka 消息的偏移量为 N + 1,而上个文件存储的最后一条 Kafka 消息的偏移量为 N(因为 Kafka 是顺序存储的)。例如下图的 368769.index 索引文件,其表示文件存储的第一条 Kafka 消息的偏移量为 368770。而 368769 表示的是 0000.index 这个索引文件的最后一条消息。所以 368769.index 索引文件,其存储的 Kafka 消息偏移量范围为 368769-737337。

20251206233441a486133fe.png

索引文件存储的是简单地索引数据,其格式为:「N,Position」。其中 N 表示索引文件里的第几条消息,而 Position 则表示该条消息在数据文件(Log File)中的物理偏移地址。例如下图中的「3,497」表示:索引文件里的第 3 条消息(即 offset 368772 的消息,368772 = 368769+3),其在数据文件中的物理偏移地址为 497。

20251206233441a7c52612e.png

其他的以此类推,例如:「8,1686」表示 offset 为 368777 的 Kafka 消息,其在数据文件中的物理偏移地址为 1686。

数据文件

数据文件的命名格式与索引文件的命名格式完全一样,这里就不再赘述了。

通过上面索引文件的分析,我们已经可以根据 offset 快速定位到某个数据文件了。那接着我们怎么读取到这条消息的内容呢?要读取到这条消息的内容,我们需要搞清楚数据文件的存储格式。

数据文件就是所有消息的一个列表,而每条消息都有一个固定的格式,如下图所示。

2025120623344121db1732c.png

从上图可以看到 Kafka 消息的物理结构,其包含了 Kafka 消息的 offset 信息、Kafka 消息的大小信息、版本号等等。有了这些信息之后,我们就可以正确地读取到 Kafka 消息的实际内容。

Kafka文件存储优势

Kafka运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message有如下特点:

写message

  • 消息从java堆转入page cache(即物理内存)。
  • 由异步线程刷盘,消息从page cache刷入磁盘。

读message

  • 消息直接从page cache转入socket发送出去。
  • 当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁 盘Load消息到page cache,然后直接从socket发出去

Kafka高效文件存储设计特点

  • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  • 通过索引信息可以快速定位message和确定response的最大大小。
  • 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  • 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

Kafka 副本同步机制

为保证producer发送的数据,能可靠到指定topic,topic的每个的partition收到 producer发送的数据后,都需要向producer发送 ack(acknowledgement确认收到),如果 producer收到 ack,就会进行下一轮的发送。

20251206233441bd47e5efc.png

ACKS 机制

在 Kafka 中,消息的 ACK(Acknowledgment,确认)机制与生产者的 acks 配置有关。acks 配置表示生产者在接收到消息后等待副本同步确认的方式,具体取值有:

  • acks=0:
    • 意义:生产者在成功将消息发送给 Kafka 服务端后不等待任何确认。
    • 结果:生产者无法知道消息是否成功到达 Kafka 服务器,可能会导致消息的丢失。这种配置下,生产者不会收到任何 ACK。
  • acks=1:
    • 意义:生产者在成功将消息发送给 Kafka 服务端后,等待该分区的首领节点(leader)确认。
    • 结果:生产者会收到分区首领节点的 ACK。这意味着只要分区首领节点成功接收到消息,生产者就会得到确认,而不需要等待其他副本。
  • acks=all 或 acks=-1:
    • 意义:生产者在成功将消息发送给 Kafka 服务端后,等待所有分区副本确认。
    • 结果:生产者会等待分区的所有副本都成功接收到消息并确认。这是最安全的配置,因为只有当所有副本都确认接收到消息后,才认为消息被成功提交。

生产者重试机制:

Kafka 生产者在发送消息后,如果设置了等待服务器的确认(通过 acks 参数配置),会等待一定时间来收到来自服务器的确认(ack)。这个等待时间由 timeout.ms 参数控制,默认是 10000 毫秒(10秒)。

如果在等待时间内没有收到服务器的确认,生产者可以选择重试发送或者处理发送失败的逻辑。这取决于生产者的配置。通常,生产者会根据配置的重试次数和重试间隔来进行重试,以确保消息最终被成功发送。

在 Kafka 的生产者配置中,你可以找到以下与重试相关的配置项:

  • retries: 定义了生产者在发送消息时的最大重试次数。
  • retry.backoff.ms: 定义了两次重试之间的等待时间间隔。

ISR 机制:

Kafka根据副本同步的情况,分成了3个集合

  • AR(Assigned Replicas):包括ISR和OSR
  • ISR(In-sync Replicas):和leader副本保持同步的副本集合,可以被认为是可靠的数据
  • OSR(Out-Sync Replicas):和Leader副本同步失效的副本集合

当 kafka 副本同步机制是所有follower都同步成功才返回 ack 给生产者时,如果有一个follower,因为某种故障,迟迟不能与leader 进行同步,那leader 就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set (ISR-同步副本列表),意为和leader保持同步的follower集合。根据follower发来的FETCH请求中的fetch offset判断ISR中的follower完成数据同步是否成功。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

  • ISR(In-Sync Replicas ):与leader保持同步的follower集合
  • AR(Assigned Replicas):分区的所有副本
    • ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。
    • AR=ISR+OSR。

ISR的伸缩

Kafka在启动的时候会开启两个与ISR相关的定时任务,名称分别为“isr-expiration"和”isr-change-propagation".。isr-expiration任务会周期性的检测每个分区是否需要缩减其ISR集合。这个周期和“replica.lag.time.max.ms”(延迟时间)参数有关。大小是这个参数一半。默认值为5000ms,当检测到ISR中有是失效的副本的时候,就会缩减ISR集合。如果某个分区的ISR集合发生变更, 则会将变更后的数据记录到ZooKerper对应/brokers/topics//partition//state节点中。

Kafka选举机制

在kafka集群中有2个种leader,一种是broker的leader即controller leader,还有一种就是partition的leader。

【Controller leader】

当broker启动的时候,都会创建KafkaController对象,但是集群中只能有一个leader对外提供服务,这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点,只有第一个成功创建的节点的KafkaController才可以成为leader,其余的都是follower。当leader故障后,所有的follower会收到通知,再次竞争在该路径下创建节点从而选举新的leader

【Partition leader】

由controller leader执行

  • 从Zookeeper中读取当前分区的所有ISR(in-sync replicas)集合
  • 调用配置的分区选择算法选择分区的leader

kafka中的 zookeeper 起到什么作用

zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用

新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖, 但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。

可以不用zookeeper么?

2.8.0版本以前需借助Zookeeper完成选举投票,后续版本kafka自身已集成该功能。

选举示意图:

LEO 表示 Log End Offset 下一条等待写入的消息的offset(最新的Offset+1。每一个消息都有一个offset ,offset实际也是topic默认50个)

HW 表示 Hign Watermark ISR中最小的LED(因为数据可能没有完全保持同步,所以Consumer最多只能消费到HW之前的位置,比如这里消费到offset5的消息,也就是说其他副本没有同步过去的消息,是不能被消费的)

follower故障

follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。

等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

20251206233441eab29869a.png

选举过程:

故障检测:当 Leader Broker 发生故障或不可用时,Zookeeper 会检测到该变化,并通知相关的 Broker。

选举触发:Zookeeper 会触发 Leader 选举过程,选择一个新的 Leader Broker。

选举算法:Kafka 使用一种基于 Zookeeper 的选举算法(如 Zab 协议)来选择新的 Leader Broker。

Kafka 副本数据一致性

尽管采用 acks = all 但是也会出现 不一致的场景,例如:

:::tips
假设 leader 接受了 producer 传来的数据为 8 条,ISR 中三台 follower(broker0,broker1,broker2)开始同步数据,由于网络传输,另外两台 follower 同步数据的速率不同。当 broker1 同步了 4 条数据,broker2 已经同步了 6 条数据,此时,leader-broker0 突然挂掉,从 ISR 中选取了 broker1 作为主节点,此时 leader-broker1 同步了 4 条,broker2 同步 6,就会造成 leader 和 followe r之间数据不一致问题。

:::

202512062334411993b3620.png

  • HW (High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息,对于同一个副本对象而言,其 HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(replicated) 。所有分区副本中消息偏移量最小值。
  • LEO(Log End Offset),即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO =8,那么表示该副本保存了 8 条消息,位移值范围是[0, 7]。LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1,分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW对消费者而言只能消费 HW 之前的消息

针对不同的产生原因,解决方案不同:

  • 当服务出现故障时:如果是 Follower 发生故障,这不会影响消息写入,只不过是少了一个备份而已。处理 相对简单一点。Kafka 会做如下处理: 20251206233441f1a6cd384.png
    • 将故障的 Follower 节点临时踢出 ISR 集合。而其他 Leader 和 Follower 继续正常接收消息。
    • 出现故障的 Follower 节点恢复后,不会立即加入 ISR 集合。该 Follower 节点会读取本地记录的上一次的 HW,将自己的日志中高于 HW 的部分信息全部删除掉,然后从 HW 开始,向 Leader 进行消息同步。
    • 等到该 Follower 的 LEO 大于等于整个 Partiton 的 HW 后,就重新加入到 ISR 集合中。这也就是说这个 Follower 的消息进度追上了 Leader。
  • 如果是 Leader 节点出现故障,Kafka 为了保证消息的一致性,处理就会相对复杂一点。

202512062334411dcba6255.png

- Leader 发生故障,会从 ISR 中进行选举,将一个原本是 Follower 的 Partition提升为新的 Leader。这时, 消息有可能没有完成同步,所以新的 Leader 的LEO 会低于之前 Leader 的 LEO。 
- Kafka 中的消息都只能以 Leader 中的备份为准。其他 Follower 会将各自的Log 文件中高于 HW 的部分全部 清理掉,然后从新的 Leader 中同步数据。 
- 旧的 Leader 恢复后,将作为 Follower 节点,进行数据恢复。  

如何确保消息不丢失

此问题在RabbitMQ中已详细解答,这里实则类似也就是三端都需确保。

Producer 端

发送数据有 ACK 机制

  • **acks =0:**由于发送后就自认为发送成功,这时如果发生网络抖动, Producer 端并不会校验 ACK 自然也就丢了,且无法重试。
  • **acks = 1:**消息发送 Leader Parition 接收成功就表示发送成功,这时只要 Leader Partition 不 Crash 掉,就可以保证 Leader Partition 不丢数据,但是如果 Leader Partition 异常 Crash 掉了, Follower Partition 还未同步完数据且没有 ACK,这时就会丢数据。
  • **acks = -1 或者 all:**消息发送需要等待 ISR 中 Leader Partition 和 所有的 Follower Partition 都确认收到消息才算发送成功, 可靠性最高。当然万一有副本出问题,这就只能干等了,可以配合 **min.insync.replicas:**设置必须确认的最小同步副本数

KafkaBroker

集群接收到数据后会将数据进行持久化存储到磁盘,为了提高吞吐量和性能,采用的是「异步批量刷盘的策略」,也就是说按照一定的消息量和间隔时间进行刷盘。首先会将数据存储到 「PageCache」 中,至于什么时候将 Cache 中的数据刷盘是由「操作系统」根据自己的策略决定或者调用 fsync 命令进行强制刷盘,如果此时 Broker 宕机 Crash 掉,且选举了一个落后 Leader Partition 很多的 Follower Partition 成为新的 Leader Partition,那么落后的消息数据就会丢失。所以只有配合生产者acks=-1除了leader落盘还有

Comsumer端

  • 手动提交位移:关闭自动提交位移(enable.auto.commit=false),改为手动提交offset。只有在消息成功处理后才提交位移,确保消息不会因消费者故障而丢失。
  • 位移重置策略:通过auto.offset.reset=earliest,确保消费者在重新启动时从最早未消费的消息开始消费,避免消息丢失。
  • 消费者组机制:通过消费者组实现消息的负载均衡,确保消息能够被均匀分配给消费者,避免某些消费者过载导致的消息丢失。

提交offset方式有commitSync()commitAsync()两种方法

附代码:同步提交

java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "TianMing-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("TianMing-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    // 处理消息
                    System.out.println("Received message: " + record.value());
                    // 模拟消息处理
                    processMessage(record);
                }
                // 提交偏移量
                consumer.commitSync();
            }
            //为确保可靠性,如果提交失败,可以直接重试或记录后续处理。
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    private static void processMessage(ConsumerRecord<String, String> record) {
        // 模拟消息处理逻辑
        System.out.println("Processing message: " + record.value());
    }
}

附代码:异步提交

java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumerAsyncExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "TianMing-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("TianMing-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    // 处理消息
                    System.out.println("Received message: " + record.value());
                    // 模拟消息处理
                    processMessage(record);
                }
                // 异步提交偏移量
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception != null) {
                            System.err.println("Commit failed for offsets: " + offsets);
                            exception.printStackTrace();
                            // 处理提交失败的情况
                        } else {
                            System.out.println("Offsets committed successfully: " + offsets);
                        }
                    }
                });
            }
            //为确保可靠性,如果提交失败,可以在回调中重试提交。
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    private static void processMessage(ConsumerRecord<String, String> record) {
        // 模拟消息处理逻辑
        System.out.println("Processing message: " + record.value());
    }
}

当然如何确保不重复消费,这里就不累赘了,可参看链接:RabbitMQ中的处理

生产者消息发送流程

整体推拉流程梳理:

2025120623344191d522b94.png

深入浅出之前 先思考下

物流快递 如何发送?:

1.先预处理快递件,(拦截器)

2.判断车队是否正常,(Producer的初始化)

3.各地区网点派发,(分区器)

4.快递封装打包 (key value 序列化)

5.称重 (序列化大小)

6.分拣员按地址分拣(RecordAccumulator 把消息加入 Batch中)

7.集装箱装机 第一次 和 下一个集装箱满时再来一个 (tryAppend 判断是否为空 )

源码解读:

如果直接从初始化的 KafkaProducer 进入 send 即可推送消息 除了网络通信, 没啥可看

所以我们回顾springboot 自动装配

有一个 spring-boot-autoconfigure 的包 里面的META-INF里的 spring.factories 引入很多的组件。

当有一个KafkaAutoConfiguration条件装配 有KafkaTemplate类 才会去加入相关依赖

java
//条件装配 
@ConditionalOnClass({KafkaTemplate.class})
@EnableConfigurationProperties({KafkaProperties.class})
@Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})
public class KafkaAutoConfiguration {
    //自动装配 注入模板即可
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;
    @RequestMapping(value = "/get", method = GET)
    @ResponseBody
    public String get(@RequestParam("msg") String value) {
        kafkaTemplate.send("tianmingtest","1",value); //发送消息
        return "success";
    }

然后 发送到 tianmingtest 这个topic上面 ,当然 我们可以去看下配置发送到哪些机器上面。

我这里配置的三台 集群上面

我们回过头看 KafkaAutoConfiguration 关于配置的properties

java
@EnableConfigurationProperties({KafkaProperties.class})
点进去可见  以 spring.kafka开头的  自动会提示
@ConfigurationProperties(
    prefix = "spring.kafka"
)
    进入KafkaProperties的 Producer可见生产者 (或者Consumer消费者的详细配置)
// 比如后面要聊到的一个重点 是否消息发送完毕 的确认机制
private String acks; //分区中成功写入消息的副本数量 0生产端发送立即返回 1副本成功写入无序等待同步其他副本 all需要等leader副本成功并同步其他副本成功写入
private DataSize batchSize;//多少条消息发送一次默认16K
private List<String> bootstrapServers;
private DataSize bufferMemory; // 客户端缓冲区大小默认32M,满了也会触发消息发送
private String clientId;
private String compressionType;//是否对消息进行压缩默认none ,压缩提升吞吐量但会牺牲CPU 有gzip ,snappy lz4 zstd 比例递增
private Class<?> keySerializer = StringSerializer.class;
private Class<?> valueSerializer = StringSerializer.class;
private Integer retries;// 生产端消息发送失败时的重试次数 默认0
private String transactionIdPrefix;
private final Map<String, String> properties = new HashMap();

接下来send 发送的方法:

kafkaTemplate.send("tianmingtest",value);

java

return this.doSend(producerRecord);

  rotected ListenableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord) {
    //先判断是否事务性的 默认false
     if (this.transactional) {
        Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
    }
    //拿到Producer 发送者  进入可见是放到一个 ThreadLocal 里面的如果没有肯定是会去创建 Alt+回退到这里
    Producer<K, V> producer = this.getTheProducer();
   

//拿到Producer生产者
private Producer<K, V> getTheProducer() {
    if (this.transactional) {
        Producer<K, V> producer = (Producer)this.producers.get();
        if (producer != null) {
            return producer;
        } else {
            KafkaResourceHolder<K, V> holder = ProducerFactoryUtils.getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
            return holder.getProducer();
        }
    } else {
        //创建Producer 的方法
        return this.producerFactory.createProducer(this.transactionIdPrefix);
    }
}

//定位到 createProducer的实现 ,可以看到这是一个工厂 DefaultKafkaProducerFactory
public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
    else {
        //这个时候它肯定为空
        if (this.producer == null) {
            synchronized(this) {
                if (this.producer == null) {
                    //进入此CloseSafeProducer 可见他会有一个 delegate /ˈdelɪɡət/ 的委派 那么这个委派是去干嘛呢 这里可见是 创建一个KafkaProducer
                    //有了这个kafkaProducer之后我们回到前面的 发送方法
                    this.producer = new DefaultKafkaProducerFactory.CloseSafeProducer(this.createKafkaProducer());
                }
            }
        }

Producer的send方法可见有三个实现 (首先是CloseSafeProducer 是 DefaultKafkaProducerFactory的)

而这个工厂刚才看过是通过 委派 创建了KafkaProducer 来发送的

20251206233441b6a5e97a1.png

可见第一个 里面实际上用的也是delegate 来发送的 return this.delegate.send(record, callback);

所以我们 重点关注刚才 的KafkaProducer 的send方法 。 这里的代码跟我们的mybatis的二级缓存 相似

java
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    //send前先经过拦截器定制化消息的拦截链(
//1.0 引入 发送前的定制化需求。   比如分类统计  修改消息 异常监测 数据加密 字段过滤等 可多个拦截链
//可实现ProducerInterceptor 接口 重写onSend方法定制化消息)
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    //接下来进入 真正的发送方法
    return this.doSend(interceptedRecord, callback);
}


private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        this.throwIfProducerClosed();
        //1.首先确保该主题topic对应的元数据metadata可用
        KafkaProducer.ClusterAndWaitTime clusterAndWaitTime= this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
        //2.计算剩余等待时间 
        long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        //3.得到序列化 key:serializedKey  。根据record中topic ,key 利用 valueSerializer得到
        byte[] serializedKey=this.keySerializer.serialize(record.topic(), record.headers(), record.key());
        //4.得到序列化 value:serializedValue   。    根据record中topic ,value 利用 valueSerializer得到
        byte[] serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
        //5.调用partition方法 获得分区号 这个分区可传入也可auto
        int partition = this.partition(record, serializedKey, serializedValue, cluster);
        //6.根据record中的topic和partition构造TopicPartition实例tp
        TopicPartition  tp = new TopicPartition(record.topic(), partition);
        this.setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();
        //7.计算序列化后的 key value及offset size所占大小
        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
        //8.确保记录大小是否有效
        this.ensureValidRecordSize(serializedSize);
        long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
        this.log.trace("Sending record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
        Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
        if (this.transactionManager != null && this.transactionManager.isTransactional()) {
            this.transactionManager.maybeAddPartitionToTransaction(tp);
        }
        //9.调用append方法添加记录,获得记录添加结果
        RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
        //10.根据结果状态判断是否需要 wakeup
        if (result.batchIsFull || result.newBatchCreated) {
            this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        //11.返回
        return result.future;
    } catch

归纳总结关键 步骤 :

确保可用,序列化key value 放到哪个分区

这里有个面试题 怎么找到topic里面的分区的 呢?

对应源码

java
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    //如果没指定 进入.partion 可见三个实现
    return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
 //默认DefaultPartitioner

1.如果指定了partition分区,就它

2.如果没有指定 三个实现类 实现Partitioner接口,有一个是我自己写的

实际上我们自己也可以写这个实现接口 重写configure 方法来实现自己的分区逻辑(自定义分区器)

202512062334413d11fddf5.png

java
/**
 * 自定义kafka分区主要解决用户分区数据倾斜问题 提高并发效率(假设 3 分区)
 * @param topic 消息队列名
 * @param key 用户传入key
 * @param keyBytes key字节数组
 * @param value 用户传入value
 * @param valueBytes value字节数据
 * @param cluster 当前kafka节点数
 * @return 如果3个节点数 返回 0 1 2 如果5个 返回 0 1 2 3 4 5
 当同一个key的消息会被分配到同一个partition中。消息在同一个partition处理的顺序是FIFO,这就保证了消息的顺序性
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // 得到 topic 的 partitions 信息
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    //获取分区总个数
    int numPartitions = partitions.size();
    //如果指定的key数组不为空
    if (keyBytes == null) {
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        //可用分区不为空
        if (availablePartitions.size() > 0) {
            //将消息分配给下一个分区 通过murmur /ˈmɜːrmər/ 的hash算法计算分区分配
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
            //可用分区为空
        } else {
            //如果指定了key ,对key做hash 分配到指定的分区
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        //设置了key 则直接给key取模计算分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

}

然后在properties配置 partitioner.class 即可

注意:自定义分区应用于分布式项目 需要将分区类打包放到lib包 Conusumer通过partition.assignment.strategy=指定

我们回到默认的 DefaultPartitioner.partition方法 看看它是如何发送的。

回顾下这几个参数

java
@param topic 消息队列名
@param key 用户传入key
@param keyBytes key字节数组
@param value 用户传入value
@param valueBytes value字节数据
@param cluster 当前kafka节点数
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //获取主题下面的分区 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    //没有指定key的情况
    if (keyBytes == null) {
        int nextValue = this.nextValue(topic);
        //获取可用分区
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        //如果分区够多,会轮循发送到不同分区
         if (availablePartitions.size() > 0) {
            //用nextvalue(下一条+1)
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
        //key不为空
    } else {
        //根据key取模  
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

因为连续问就来了:如果设置了key 会怎么去发送?

所以我们第三步得记下:如果key为空会根据给的key取模也就是指定到某个分区 ,不为空的话会拿到可用分区去轮循到不同的分区

那么消息究竟归谁去发送的呢?

KafkaProducer构造函数 可见

java
KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) {
    ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer));
    //可见前面判断参数  最后catch异常中间 开启 Sender线程 发送消息
    this.errors = this.metrics.sensor("errors");
     // 关键点在这个  Sender线程
        this.sender = this.newSender(logContext, kafkaClient, this.metadata);
        String ioThreadName = "kafka-producer-network-thread | " + clientId;
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();
        config.logUnused();
        AppInfoParser.registerAppInfo("kafka.producer", clientId, this.metrics, time.milliseconds());
        this.log.debug("Kafka producer started");
        
        
       //再往里面就是 Sender线程  的run 方法 调用的 runOnce方法  最后
       //将需要发送的消息发送给client的待发数据中  (进入可见 判断哪些topic partition 可发 批次)
        long pollTimeout = this.sendProducerData(currentTimeMs);
        //真正NIO发送 Poll 网络模型发送数据
        this.client.poll(pollTimeout, currentTimeMs);

最终 交由 NetworkClient 的 poll方法发送 此处就是网络 IO模型了 。 Selector 。。。。这里就不做深入了。(有需求,可参看网络模型)

基本流程里面到这里就基本结束了

kafka消息拦截链 应用场景。

分类统计 修改消息 异常监测 数据加密 字段过滤等 可多个拦截链

真正发送数据的方法 ?如何自定义选择分区?

java
9.回到KafkaProducer.doSend的 append方法
        RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
        
// 1进入RecordAccumulatorde append方法
 public RecordAccumulator.RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException {
    this.appendsInProgress.incrementAndGet();
    ByteBuffer buffer = null;
    if (headers == null) {
        headers = Record.EMPTY_HEADERS;
    }

    RecordAccumulator.RecordAppendResult var19;
    try {
        //2 根据主题与分区号获取相对应的双端队列
        Deque<ProducerBatch> dq = this.getOrCreateDeque(tp);
        // 锁定上一步返回的ArrayDeque
        synchronized(dq) {
            if (this.closed) {
                throw new KafkaException("Producer closed while send in progress");
            }
            //3 尝试拼接消息 并返回RecordAppendResult
            RecordAccumulator.RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null) {
                RecordAccumulator.RecordAppendResult var14 = appendResult;
                return var14;
            }
        }

     。。。。
}

最终的 tryAppend 方法很多判断拼接

java

private RecordAccumulator.RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
    //ProducerBatch.peekLast 从ArrayDeque双端队列中取最后一个 ProducerBatch
    ProducerBatch last = (ProducerBatch)deque.peekLast();
    //是否为空
    if (last != null) {
        //尝试拼接写入当前批次
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, this.time.milliseconds());
        // 写入结果是否为空
         if (future != null) {
            return new RecordAccumulator.RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        //关闭
        last.closeForRecordAppends();
    }
    return null;
}

同步代码块中尝试去做一次追加操作tryAppend(),如果成功就直接返回追加的结果对象。tryAppend方法中逻辑是:

a)如果dq中有ProducerBatch则往新一个batch中追加

a.1)追加不成功,说明最新的batch空间不足,返回null。需要外层逻辑创建新的batch

a.2)追加成功,返回RecordAppendResult

b)dq中无producerBatch,返回null,代表没有能追加成功。

第一个到达此方法的线程肯定是返回了null,因为还没有消息累积进来,也不存在ProducerBatch对象。

如果tryAppend返回null,说明没能直接在现有的batch上追加成功(也可能还没有batch),此时需要初始化新的ProducerBatch

总结一下:

1、RecordAccumulator使用ProducerBatch缓存消息。每个主题分区拥有一个ProducerBatch的队列。

2、当ProducerBatch队列的队尾batch不能再容纳新消息时,对其进行封箱操作,同时新建ProducerBatch放入队尾来存放新消息。

3、ProducerBatch对消息追加的操作都是通过MemoryRecordsBuilder进行的。消息最终被追加到MemoryRecordsBuilder中的DataOutputStream appendStream中

最后再给你们上个图吧。(图太大,看关键部分吧)

20251206233441d33f88a55.png

既然已经深入源码解读了 发送消息流程,当然少不了对应的消费流程了。

消费者消费消息流程

同样的我们也先深入浅出滴思考下

物流快递 如何收件?:

1.监听物流中心 (处理登记处ListenerContainer和实例化Listener)

2.分发各个快递员 (将ListenerConsumer添加到Executor线程池)

3.开始按批次取件 (启动线程run方法pollAndInvoke拉取消息)

4.送货上门 (返回InvokeIfHaveRecords(recods))

源码解读

如果直接从初始化的 KafkaConsumer 进入 poll 即可拉取消息 除了网络通信, 没啥可看

所以 首先我们应该已经知道消费者 comsumer 会在一个 group 里面,然后消费一个topic

怎么指定的呢 @KafkaListener(topics = "tianmingtest",groupId = "testgroup")

java
/**
 * 消费者
 */
@Component
public class ConsumerListener {
    //指定订阅的 topic 以及归属的group
    @KafkaListener(topics = "tianmingtest",groupId = "testgroup")
    public void onMessage(String msg){
        System.out.println("----收到消息:" + msg + "----");
    }
}

那我们一个注解 如何 起效果的呢 ? 入口类从这里开始

KafkaListenerAnnotationBeanPostProcessor

实现了 BeanPostProcessor 接口(讲Spring的时候还有印象没 ? Bean生命周期 初始化Bean前后的回调的两个方法),

主要开始流程在 postProcessAfterInitialization 后置处理器中:对象实例化之后调用

(初始化回调之前的方法before先不用管)

java
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
        Class targetClass = AopUtils.getTargetClass(bean);
        //是否有类添加KafkaListener注解
        Collection  classLevelListeners =  this.findListenerAnnotations(targetClass);
        boolean hasClassLevelListeners = classLevelListeners.size() > 0;
        List<Method> multiMethods = new ArrayList();
        //是否有方法添加 KafkaListener 注解,并与方法建立隐射关系
        Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (methodx) -> {
            Set<KafkaListener> listenerMethods = this.findListenerAnnotations(methodx);
            return !listenerMethods.isEmpty() ? listenerMethods : null;
        });
        //如果KafkaListener在类级别上
        if (hasClassLevelListeners) {
            //去找 KafkaHandler的方法注解 (如果类上被 KafkaListener修饰,那么类方法必须要使用 KafkaHandle配合实现)
            Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (methodx) -> {
                return AnnotationUtils.findAnnotation(methodx, KafkaHandler.class) != null;
            });
            multiMethods.addAll(methodsWithHandler);
        
        } else {
            Iterator var13 = annotatedMethods.entrySet().iterator();
            
            // 遍历annotatedMethods  根据隐射关系执行 processKaListener
            while(true) {
                if (!var13.hasNext()) {
                    this.logger.debug(() -> {
                        return annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods;
                    });
                    break;
                }
                 Entry<Method, Set<KafkaListener>> entry = (Entry)var13.next();
                    Method method = (Method)entry.getKey();
                    Iterator var11 = ((Set)entry.getValue()).iterator();
            
                    while(var11.hasNext()) {
                        KafkaListener listener = (KafkaListener)var11.next();
                        // 注解在方法上的逻辑 
                        this.processKafkaListener(listener, method, bean, beanName);
                    }
                }
            }
                // 处理类级别的 ClassLevelListeners 和 multiMethods隐射关系
                  if (hasClassLevelListeners) {
                    this.processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
                }
            }
            
            return bean;

进入 processKafListener 方法 可见 设置 endpoint 信息 和 this.processListener方法 进入可见 registerEndpoint 方法 。

进入 registerEndpoint 方法可见 startImmediately /ɪˈmiːdiətli/ 默认false 所以 将信息add 到 List< KafakaListenerEndpointDescriptor>endpointDescriptors List集合中

那真正创建 这些Listner的逻辑 在哪里呢?

真正创建MessageListenerContainer的地方是在KafkaListenerEndpointRegistry中,因为 最初的

KafkaListenerAnnotationBeanPostProcessor类的 单列实例化 之后 调用的方法afterSingletonsInstantiated 最后

this.registrar.afterPropertiesSet(); 里面 调用

的 registerAllEndpoints()

最终调用到了KafkaListenerEndpointRegistrar的 registerListenerContainer

此方法

java
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) {
    Assert.notNull(endpoint, "Endpoint must not be null");
    Assert.notNull(factory, "Factory must not be null");
    String id = endpoint.getId();
    Assert.hasText(id, "Endpoint id must not be empty");
    synchronized(this.listenerContainers) {
        Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'");
        //创建 MessageListenerContainer 实例
        MessageListenerContainer container = this.createListenerContainer(endpoint, factory);
        this.listenerContainers.put(id, container);
        ConfigurableApplicationContext appContext = this.applicationContext;
        String groupName = endpoint.getGroup();
        if (StringUtils.hasText(groupName) && appContext != null) {
            Object containerGroup;
            ContainerGroup group;
            if (appContext.containsBean(groupName)) {
                containerGroup = (List)appContext.getBean(groupName, List.class);
                group = (ContainerGroup)appContext.getBean(groupName + ".group", ContainerGroup.class);
            } else {
                containerGroup = new ArrayList();
                appContext.getBeanFactory().registerSingleton(groupName, containerGroup);
                group = new ContainerGroup(groupName);
                appContext.getBeanFactory().registerSingleton(groupName + ".group", group);
            }
            // 加入分组
            ((List)containerGroup).add(container);
            group.addContainers(new MessageListenerContainer[]{container});
        }
        // 默认false  所以启动时 暂时不会执行start方法,会先交给 SmartLifecycle 接口处理
        // 这里KafkaListenerEndpointRegistry类实现了此接口,会在项目所有bean加载和初始化完毕之后执行start方法
        if (startImmediately) {
            this.startIfNecessary(container);
        }

    }
}

进入 this.startIfNecessay 实际用的参数的start方法 再进入 方法参数 MessageListenerContainer 接口的实现类 AbstractMessageListenerContainer 的 start 方法

发现是一个abstract的 doStart 方法 ,说明实现是交给它的子类去实现

我们来到 它的子类 KafkaMessageListenerContainer 的 doStart 方法

java
protected void doStart() {
    if (!this.isRunning()) {
        if (this.clientIdSuffix == null) {
            this.checkTopics();
        }

        ContainerProperties containerProperties = this.getContainerProperties();
        this.checkAckMode(containerProperties);
        Object messageListener = containerProperties.getMessageListener();
        AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor((AsyncListenableTaskExecutor)consumerExecutor);
        }

        GenericMessageListener<?> listener = (GenericMessageListener)messageListener;
        ListenerType listenerType = this.determineListenerType(listener);
        // 实例化一个listenerConsumer.
        // 进入这个属性 listenerConsumer可见实现了 SchedulingAwareRunnable 说明是个线程 肯定 runabler接口 run方法 这个我们一会进去瞧瞧
        this.listenerConsumer = new KafkaMessageListenerContainer.ListenerConsumer(listener, listenerType);
        this.setRunning(true);
        this.startLatch = new CountDownLatch(1);
        //将 listenerConsumer 添加到线程池 Executor
        this.listenerConsumerFuture = ((AsyncListenableTaskExecutor)consumerExecutor).submitListenable(this.listenerConsumer);
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error("Consumer thread failed to start - does the configured task executor have enough threads to support all containers and concurrency?");
                 this.publishConsumerFailedToStart();
            }
        } catch (InterruptedException var7) {
            Thread.currentThread().interrupt();
        }

    }
}

接下来 应该知道要到哪里了吧 。 肯定是 run方法啊 。 来到 listenerConsumer 的run方法

这个里面我们能 最终看到 消费端获取消息采用的 poll方式 ,一次性拿多条数据

java
public void run() {
    ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
    KafkaMessageListenerContainer.this.publishConsumerStartingEvent();
    this.consumerThread = Thread.currentThread();
    this.setupSeeks();
    KafkaUtils.setConsumerGroupId(this.consumerGroupId);
    this.count = 0;
    this.last = System.currentTimeMillis();
    this.initAssignedPartitions();
    KafkaMessageListenerContainer.this.publishConsumerStartedEvent();
    Object exitThrowable = null;
    //循环调用
    while(KafkaMessageListenerContainer.this.isRunning()) {
        try {
            // poll 拉取数据  , 消费端获取消息采用的 poll方式 ,一次性拿多条数据 
            // 进入 可见封装成 ConsumerRecords 消息对象
            this.pollAndInvoke();
        } catch (NoOffsetForPartitionException var12) {
     ....
            this.logger.error(var16, "Stopping container due to an Error");
            this.wrapUp(var16);
            throw var16;
        } catch (Exception var17) {
            this.handleConsumerException(var17);
        } finally {
            this.clearThreadState();
        }
    }

    this.wrapUp((Throwable)exitThrowable);
}

进入 pollAdnInvoke 方法 最后 invokeIfHaveRecords > invokeListener(records) > 判断是否批量消费 invokeBatchListener / invokeRecordListener

可以配置参数:concurrency 控制listener 的线程数量 ,并发开关可以通过batchListener = true 开启 配合max_poll_records_config****=50 多少条一次poll返回 也可以配置间隔时间tnterval.ms 间隔多久poll一次 最多多少条

java
# 如果有 两个方法标记 @KafkaListener 会启动 2 * 3 个Consuumer线程 ,6个Listener线程
#spring.kafka.listener.concurrency=3

20251206233441bf0af4abe.png

默认 doInvokeWithRecords 方法

是否开启事务模板 invokeRecordListenerInTx 和 doInvokeWithRecords

默认 > doInvokeRecordListener > invokeOnMessage > doInvokeOnMessage

四种type 类型的listener . 提供不同的接口处理

1。 Simple 不考虑提交偏移量和consumer对象 ;

2。Acknowledgine 需要手动提交时 而不是自动提交或者 spring-kafka自己实现提交方式时,需要如下接口中acknowledment 的 acknowlegge()方法提交偏移量

3.consumer_aware 类似SpringIOC的 ApplicationContextAware功能,如果消费消息时,需要用consumer对象,则使用这个类型

4.ACKNOWLEDGING_CONSUMER_AWARE,同时支持ACKNOWLEDGING和CONSUMER_AWARE两种类型

这些类型由 接口GenericMessageListener接口具体哪个实现类来决定

最终 > onMessage 返回消息

总结一下:

1.入口KafkaListenerAnnotationBeanPostProcessor,实现了 BeanPostProcessor等接口,所以Bean加载完后会进入postProcessAfterInitialization方法

2.找到有@KafkaListener注解的类或者方法,并且将注解的信息封装成MethodKafkaListenerEndpoint 进入processListener中registerEndpoint方法,将数据封装到 KafkaListenerEndpointDescriptor,并且添加到List

3.KafkaListenerAnnotationBeanPostProcessor 实现了SmartInitializingSingleton,在当所有单例 bean 都初始化完成以后,会调 用afterSingletonsInstantiated方法

4.在afterSingletonsInstantiated方法中调用 registerListenerContainer方法,将消费者信息封装到 listenerContainers中

5.KafkaListenerEndpointRegistry类实现了SmartLifecycle接口,所以会容器启动调用start方法 8.进入AbstractMessageListenerContainer的start方法 发现静态方法交给子类实现

6.进入KafkaMessageListenerContainer的doStart方法

7.实例化一个listenerConsumer,listenerConsumer实现了SchedulingAwareRunnable。SchedulingAwareRunnable继承了runnable,并将listenerConsumer添加至线程池

8.进入listenerConsumer的run方法,循环调用this.pollAndInvoke();

9.pollAndInvoke中调用this.doPoll();获取信息

最后再给你们上个图吧。(图太大,看关键部分吧)

20251206233441db2e87be9.png

理论上解下来要深入的应该是上图中的epoll了。但实际上具备上述源码已经能吊打面试官了,后续就不解读了。

有这一块诉求和能力的给你开个门吧:

Sender.run() > 注册到一个Selector Selector.send() 注册channel 相当于MainReactor

实际IO操作 channel.finishConnect()

然后多个处理线程 KafkaChannel.setSend() 相当于多个SubReactor

实际IO操作 SocketChannel channel = serverSoceketChannel.accept();

真正处理的 TransportLayer.addInterestOps() 最后 SelectionKey.interestOps();

进入nioInterestOps(key.interestOps() | SelectionKey.OP_Read或Write) 相当于 Work线程

实际IO操作在Channel.write / read

实际底层 发送的是包装Channel的 NetworkSend

谁告诉你解决 Kafka 消息积压增加消费者数就可以了?

在 Kafka 的日常使用和运维过程中,消息积压几乎是每个开发者都可能遇到的问题。网上流行的一条“金句”是:消息积压了?赶紧加消费者啊!

可惜,现实往往没那么简单 —— 只靠增加消费者实例,很多情况下并不能解决积压问题,甚至可能浪费资源。

本文带你理清 Kafka 消费原理,搞明白“增加消费者数”到底有没有用,什么时候有用,什么时候没用,以及真正有效的解决 Kafka 消息积压的方法。


一、Kafka 消费并发的本质

20251206233441f7a5541d1.png

Kafka 的并发消费能力,决定于两个核心指标:

  • 分区数(Partition 数量):Kafka 主题下的分区数决定了并行消费的上限。
  • 消费者实例数(Consumer Instance 数量):同一个消费组下,真正“在工作”的实例不能超过分区数。

并发度 = min(分区数, 消费者实例数)


二、为什么只加消费者没用?

举个例子:

  • 你的 topic 有 5 个分区
  • 消费组里原本有 3 个实例,发现消费不过来了
  • 你一下子加到 10 个消费者实例

你以为消费就能提速了,但事实上只有 5 个消费者在真正消费,其它 5 个处于“闲着,领不到分区”的状态,看着队列干着急……

原因很简单:每个分区同一时间只能被一个组内消费者消费,分区是并发的上限。


三、盲目加实例,代价是什么?

  • 浪费计算资源:无谓的实例占用机器、内存,增加管理复杂度
  • 业务无改善:积压依然看着堆在 broker 上,不动如山
  • 还有可能“抖动”:消费者组频繁重平衡,反而影响稳定性

四、什么情况下加消费者有效?

  • 你的消费者数远小于分区数:此时提升实例,可以提升并发
  • 分区数>实例数:比如有 20 分区,只有 5 个消费者,多加几个效果立竿见影

五、想解决积压,正确姿势是什么?

  1. 诊断瓶颈
    • 是消费者慢?业务处理慢?还是分区数就不够?
  2. 优化消费逻辑
    • 去掉慢 SQL、大文件操作、同步调用,能异步别同步,能批量别单条
  3. 检查分区数
    • 分区太少限制并发,可以考虑增加分区(注意分区扩容带来的乱序等问题)
  4. 合理扩展消费者
    • 在分区数支撑前提下,加消费者才有用
  5. 监控和预警
    • 持续关注 Lag 指标,及时发现和响应

六、总结

Kafka 积压不是简单“加人就能扛”的搬砖活。

请牢记:

“Kafka 消费能力的天花板由分区数决定,盲目的增加消费者并不能突破物理并发的上限。”

碰到消息积压,别急着加人头,先搞懂原理,再下药,别做无用功!

更新: 2025-04-29 15:39:27
原文: https://www.yuque.com/tulingzhouyu/db22bv/bubwm3kyevkt68m8